169914d49aa5dfc562fb399ccb29c9b946120c18,ninio-http/src/main/java/com/davfx/ninio/http/WebsocketSocket.java,WebsocketSocket,WebsocketSocket,#HttpClient#String#Address#Connecting#Closing#Failing#Receiver#Buffering#,120

Before Change


			connecting.connected(this, connectAddress);
		}
		
		sender = httpClient.request()
			.failing(failing)
			.buffering(buffering)
			.receiving(new HttpReceiver() {
				private boolean opcodeRead = false;
				private int currentOpcode;
				private boolean lenRead = false;
				private boolean mustReadExtendedLen16;
				private boolean mustReadExtendedLen64;
				private long currentLen;
				private long currentRead;
				private boolean mustReadMask;
				private ByteBuffer currentExtendedLenBuffer;
				private byte[] currentMask;
				private ByteBuffer currentMaskBuffer;
				private int currentPosInMask;
				
				private long toPing = 0L;

				@Override
				public HttpContentReceiver received(Disconnectable disconnectable, HttpResponse response) {
					// We should check everything here (status code, header Sec-WebSocket-Accept, ...)
					if (response.status != 101) {
						if (failing != null) {
							failing.failed(new IOException("Could not connect to " + connectAddress + " [" + response.status + " " + response.reason + "]"));
						}
						return null;
					}
					
					return new HttpContentReceiver() {
						@Override
						public void received(ByteBuffer buffer) {
							while (buffer.hasRemaining()) {
								if (!opcodeRead && buffer.hasRemaining()) {
									int v = buffer.get() & 0xFF;
									if ((v & 0x80) != 0x80) {
										LOGGER.error("Current implementation handles only FIN packets");
										sender.cancel();
										if (closing != null) {
											closing.closed();
										}
										return;
									}
									currentOpcode = v & 0x0F;
									opcodeRead = true;
								}
						
								if (!lenRead && buffer.hasRemaining()) {
									int v = buffer.get() & 0xFF;
									int len = v & 0x7F;
									if (len <= 125) {
										currentLen = len;
										mustReadExtendedLen16 = false;
										mustReadExtendedLen64 = false;
									} else if (len == 126) {
										mustReadExtendedLen16 = true;
										mustReadExtendedLen64 = false;
										currentExtendedLenBuffer = ByteBuffer.allocate(2);
									} else {
										mustReadExtendedLen64 = true;
										mustReadExtendedLen16 = false;
										currentExtendedLenBuffer = ByteBuffer.allocate(8);
									}
									mustReadMask = ((v & 0x80) == 0x80);
									if (mustReadMask) {
										currentMask = new byte[4];
										currentMaskBuffer = ByteBuffer.wrap(currentMask);
										currentPosInMask = 0;
									}
									lenRead = true;
								}
								
								while (mustReadExtendedLen16 && buffer.hasRemaining()) {
									int v = buffer.get();
									currentExtendedLenBuffer.put((byte) v);
									if (currentExtendedLenBuffer.position() == 2) {
										currentExtendedLenBuffer.flip();
										currentLen = currentExtendedLenBuffer.getShort() & 0xFFFF;
										mustReadExtendedLen16 = false;
										currentExtendedLenBuffer = null;
									}
								}
								while (mustReadExtendedLen64 && buffer.hasRemaining()) {
									int v = buffer.get();
									currentExtendedLenBuffer.put((byte) v);
									if (currentExtendedLenBuffer.position() == 8) {
										currentExtendedLenBuffer.flip();
										currentLen = currentExtendedLenBuffer.getLong();
										mustReadExtendedLen64 = false;
										currentExtendedLenBuffer = null;
									}
								}
								while (mustReadMask && buffer.hasRemaining()) {
									int v = buffer.get();
									currentMaskBuffer.put((byte) v);
									if (currentMaskBuffer.position() == 4) {
										currentMaskBuffer = null;
										mustReadMask = false;
									}
								}
								
								if (opcodeRead && lenRead && !mustReadExtendedLen16 && !mustReadExtendedLen64 && !mustReadMask && buffer.hasRemaining() && (currentRead < currentLen)) {
									ByteBuffer partialBuffer;
									int len = (int) Math.min(buffer.remaining(), currentLen - currentRead);
									if (currentMask == null) {
										partialBuffer = buffer.duplicate();
										partialBuffer.limit(partialBuffer.position() + len);
										buffer.position(buffer.position() + len);
										currentRead += len;
									} else {
										partialBuffer = ByteBuffer.allocate(len);
										while (buffer.hasRemaining() && (currentRead < currentLen)) {
											int v = buffer.get() & 0xFF;
											v ^= currentMask[currentPosInMask];
											partialBuffer.put((byte) v);
											currentRead++;
											currentPosInMask = (currentPosInMask + 1) % 4;
										}
										partialBuffer.flip();
									}
									int opcode = currentOpcode;
									long frameLength = currentLen;

									if (currentRead == currentLen) {
										opcodeRead = false;
										lenRead = false;
										mustReadExtendedLen16 = false;
										mustReadExtendedLen64 = false;
										currentExtendedLenBuffer = null;
										mustReadMask = false;
										currentMaskBuffer = null;
										currentMask = null;
										currentRead = 0L;
									}
									
									if (opcode == 0x09) {
										if (toPing == 0L) {
											toPing = frameLength;
											sender.send(WebsocketUtils.headerOf(0x0A, frameLength));
										}

										toPing -= partialBuffer.remaining();
										sender.send(partialBuffer);
									} else if ((opcode == 0x01) || (opcode == 0x02)) {
										if (receiver != null) {
											receiver.received(WebsocketSocket.this, null, partialBuffer);
										}
									} else if (opcode == 0x08) {
										LOGGER.debug("Connection closed by peer");
										sender.cancel();
										if (closing != null) {
											closing.closed();
										}
										break;
									}
								}
							}
						}
						
						@Override
						public void ended() {
							LOGGER.debug("Connection abruptly closed by peer");
							sender.cancel();
							if (closing != null) {
								closing.closed();
							}
						}
					};
				}
			})
			.build(request);
	}

	@Override

After Change


			.build()
		);

		HttpRequestBuilder b = httpClient.request();
		sender = b.build(request);
		b.receive(new HttpReceiver() {
			private boolean opcodeRead = false;
			private int currentOpcode;
			private boolean lenRead = false;
			private boolean mustReadExtendedLen16;
			private boolean mustReadExtendedLen64;
			private long currentLen;
			private long currentRead;
			private boolean mustReadMask;
			private ByteBuffer currentExtendedLenBuffer;
			private byte[] currentMask;
			private ByteBuffer currentMaskBuffer;
			private int currentPosInMask;
			
			private long toPing = 0L;

			@Override
			public HttpContentReceiver received(final HttpResponse response) {
				// We should check everything here (status code, header Sec-WebSocket-Accept, ...)
				if (response.status != 101) {
					queue.execute(new Runnable() {
						@Override
						public void run() {
							if (closed) {
								return;
							}
							if (connection != null) {
								closed = true;
								connection.failed(new IOException("[" + response.status + " " + response.reason + "]"));
							}
						}
					});
					return null;
				}
				
				return new HttpContentReceiver() {
					@Override
					public void received(ByteBuffer buffer) {
						SendCallback sendCallback = new SendCallback() {
							@Override
							public void failed(IOException e) {
								sender.cancel();
							}
							@Override
							public void sent() {
							}
						};
						
						while (buffer.hasRemaining()) {
							if (!opcodeRead && buffer.hasRemaining()) {
								int v = buffer.get() & 0xFF;
								if ((v & 0x80) != 0x80) {
									LOGGER.error("Current implementation handles only FIN packets");
									sender.cancel();
									connection.closed();
									return;
								}
								currentOpcode = v & 0x0F;
								opcodeRead = true;
							}
					
							if (!lenRead && buffer.hasRemaining()) {
								int v = buffer.get() & 0xFF;
								int len = v & 0x7F;
								if (len <= 125) {
									currentLen = len;
									mustReadExtendedLen16 = false;
									mustReadExtendedLen64 = false;
								} else if (len == 126) {
									mustReadExtendedLen16 = true;
									mustReadExtendedLen64 = false;
									currentExtendedLenBuffer = ByteBuffer.allocate(2);
								} else {
									mustReadExtendedLen64 = true;
									mustReadExtendedLen16 = false;
									currentExtendedLenBuffer = ByteBuffer.allocate(8);
								}
								mustReadMask = ((v & 0x80) == 0x80);
								if (mustReadMask) {
									currentMask = new byte[4];
									currentMaskBuffer = ByteBuffer.wrap(currentMask);
									currentPosInMask = 0;
								}
								lenRead = true;
							}
							
							while (mustReadExtendedLen16 && buffer.hasRemaining()) {
								int v = buffer.get();
								currentExtendedLenBuffer.put((byte) v);
								if (currentExtendedLenBuffer.position() == 2) {
									currentExtendedLenBuffer.flip();
									currentLen = currentExtendedLenBuffer.getShort() & 0xFFFF;
									mustReadExtendedLen16 = false;
									currentExtendedLenBuffer = null;
								}
							}
							while (mustReadExtendedLen64 && buffer.hasRemaining()) {
								int v = buffer.get();
								currentExtendedLenBuffer.put((byte) v);
								if (currentExtendedLenBuffer.position() == 8) {
									currentExtendedLenBuffer.flip();
									currentLen = currentExtendedLenBuffer.getLong();
									mustReadExtendedLen64 = false;
									currentExtendedLenBuffer = null;
								}
							}
							while (mustReadMask && buffer.hasRemaining()) {
								int v = buffer.get();
								currentMaskBuffer.put((byte) v);
								if (currentMaskBuffer.position() == 4) {
									currentMaskBuffer = null;
									mustReadMask = false;
								}
							}
							
							if (opcodeRead && lenRead && !mustReadExtendedLen16 && !mustReadExtendedLen64 && !mustReadMask && buffer.hasRemaining() && (currentRead < currentLen)) {
								final ByteBuffer partialBuffer;
								int len = (int) Math.min(buffer.remaining(), currentLen - currentRead);
								if (currentMask == null) {
									partialBuffer = buffer.duplicate();
									partialBuffer.limit(partialBuffer.position() + len);
									buffer.position(buffer.position() + len);
									currentRead += len;
								} else {
									partialBuffer = ByteBuffer.allocate(len);
									while (buffer.hasRemaining() && (currentRead < currentLen)) {
										int v = buffer.get() & 0xFF;
										v ^= currentMask[currentPosInMask];
										partialBuffer.put((byte) v);
										currentRead++;
										currentPosInMask = (currentPosInMask + 1) % 4;
									}
									partialBuffer.flip();
								}
								int opcode = currentOpcode;
								long frameLength = currentLen;

								if (currentRead == currentLen) {
									opcodeRead = false;
									lenRead = false;
									mustReadExtendedLen16 = false;
									mustReadExtendedLen64 = false;
									currentExtendedLenBuffer = null;
									mustReadMask = false;
									currentMaskBuffer = null;
									currentMask = null;
									currentRead = 0L;
								}
								
								if (opcode == 0x09) {
									if (toPing == 0L) {
										toPing = frameLength;
										sender.send(WebsocketUtils.headerOf(0x0A, frameLength), sendCallback);
									}

									toPing -= partialBuffer.remaining();
									sender.send(partialBuffer, sendCallback);
								} else if ((opcode == 0x01) || (opcode == 0x02)) {
									queue.execute(new Runnable() {
										@Override
										public void run() {
											if (closed) {
												return;
											}
											if (connection != null) {
												connection.received(null, partialBuffer);
											}
										}
									});
								} else if (opcode == 0x08) {
									LOGGER.debug("Connection closed by peer");
									sender.cancel();
									queue.execute(new Runnable() {
										@Override
										public void run() {
											if (closed) {
												return;
											}
											if (connection != null) {
												closed = true;
												connection.closed();
											}
										}
									});
									break;
								}
							}
						}
					}
					
					@Override
					public void ended() {
						LOGGER.debug("Connection abruptly closed by peer");
						sender.cancel();
						queue.execute(new Runnable() {
							@Override
							public void run() {
								if (closed) {
									return;
								}
								if (connection != null) {
									closed = true;
									connection.closed();
								}
							}
						});
					}
				};
			}
			
			@Override
			public void failed(final IOException ioe) {
				queue.execute(new Runnable() {
					@Override
					public void run() {
						if (closed) {
							return;
						}
						if (connection != null) {
							closed = true;
							connection.failed(ioe);
						}
					}
				});
			}
		});
	}

	@Override